package com.free.bsf.elasticsearch.impl;

import com.free.bsf.core.util.ConvertUtils;
import com.free.bsf.core.util.HttpClientUtils;
import com.free.bsf.core.util.JsonUtils;
import com.free.bsf.core.util.ReflectionUtils;
import com.free.bsf.elasticsearch.ElasticSearchProperties;
import com.free.bsf.elasticsearch.base.ElasticSearchException;
import com.free.bsf.elasticsearch.base.Param;
import lombok.Data;
import lombok.val;
import lombok.var;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;

import java.lang.reflect.Type;
import java.util.*;

public class ElasticSearchRestSqlProvider extends ElasticSearchSqlProvider {
	private static Class mapClass=new HashMap<String,Object>().getClass();
	private String url;
	public ElasticSearchRestSqlProvider(ElasticSearchProperties elasticSearchProperties){
		super(elasticSearchProperties,null,null);
		url= StringUtils.strip(elasticSearchProperties.getRestServerAddrs(),"/")+"/_sql";
	}

	private String request(String sql){
		val post = new HttpPost(url);
		post.setHeader("Accept", "application/json");
		post.setHeader("Content-Type", "application/json");
		String charSet = "UTF-8";
		post.setEntity(new StringEntity(sql, charSet));
		val pool =  HttpClientUtils.system();
		try {
			try (val rep = pool.getClient().execute(post)) {
				return pool.toString(rep);
			}
		}catch (Exception e){
			throw new ElasticSearchException(e);
		}
	}

	public <T> List<T> searchListBySql(String sql,Param param, Class<T> clazz){
		return searchListBySql(param.build(sql),clazz);
	}

	public <T> List<T> searchListBySql(String sql, Class<T> clazz) {
		return ElasticSearchMonitor.hook().run("searchListBySql", () -> {
			return pintSql(sql,()->{
				val js = request(sql);
				val result = (EsResult)fromJson(js, EsResult.class);
				val dataTable = parse(result);
				if(dataTable==null)
					throw new ElasticSearchException(js);
				return fromJson(toJson(dataTable),getListType(clazz));
			});
		});
	}

	public <T> T searchObjectBySql(String sql,Param param, Class<T> clazz){
		return searchObjectBySql(param.build(sql),clazz);
	}

	public <T> T searchObjectBySql(String sql, Class<T> clazz) {
		return ElasticSearchMonitor.hook().run("searchObjectBySql", () -> {
			return pintSql(sql,()-> {
				val js = request(sql);
				val result = (EsResult) fromJson(js, EsResult.class);
				val dataTable = parse(result);
				if (dataTable == null)
					throw new ElasticSearchException(js);
				if (dataTable.size() > 0) {
					val firstKey = dataTable.get(0).keySet().stream().findFirst().get();
					return ConvertUtils.convert(dataTable.get(0).get(firstKey), clazz);
				}
				return null;
			});
		});
	}

	public void deleteBySql(String sql,Param param){
		deleteBySql(sql,param);
	}

	public void deleteBySql(String sql) {
		ElasticSearchMonitor.hook().run("deleteBySql", () -> {
			pintSql(sql,()-> {
				val js = request(sql);
				val result = (HashMap<String, Object>) fromJson(js, mapClass);
				if (result.containsKey("deleted")) {
					return true;
				}
				throw new ElasticSearchException(js);
			});
		});
	}

	@Override
	public void close() {

	}



	@Data
	private static class EsResult{
		Hits hits;
		Map<String, aggregation> aggregations;
		boolean timed_out;
		long took;
		Shards _shards;

		public List<Map<String,Object>> getSources() {
			List<Map<String,Object>> rs = new ArrayList<>();
			if(hits!=null&&hits.hits!=null) {
				for (var h : hits.hits) {
					rs.add(h._source);
				}
			}
			return rs;
		}
		@Data
		private  static  class aggvalue
		{
			 Object value;
			 String value_as_string;
		}
		@Data
		private  static  class aggregation extends aggvalue
		{
			List<HashMap<String,Object>> buckets;
			long doc_count_error_upper_bound;
			long sum_other_doc_count;
		}

		@Data
		private  static  class Hits {
			int total;
			int max_score;
			List<hit> hits;
		}
		@Data
		private  static  class hit {
			String _index;
			String _type;
			String _id;
			String _score;
			Map<String,Object> _source;
		}
		@Data
		private  static  class Shards{
			long failed;
			long skipped;
			long successful;
			long total;
		}

	}
	@Data
	private static  class DataTable extends ArrayList<DataRow>{
		Map<String,Type> columns=new HashMap<>();
		public DataRow newRow(){
			var row = new DataRow();
			for(var c:columns.entrySet()){
				row.put(c.getKey(),null);
			}
			return row;
		}

		public DataTable getRows(){
			return this;
		}

	}
	@Data
	private static  class DataRow extends HashMap<String,Object>{

	}

	private DataTable parse(EsResult result){
		if (result.aggregations != null && result.aggregations.size() > 0)
		{
			var firstKey = result.aggregations.keySet().stream().findFirst().get();
			var first = result.aggregations.get(firstKey);
			//groupby 统计方式
			if (first != null && first.buckets != null && first.buckets.size() > 0)
			{
				val dataTable = new DataTable();
				//以第一列为准,填充列
				{
					var t = first.buckets.get(0);
					dataTable.getColumns().put(firstKey,null);
					for (var c : t.entrySet())
					{
						if (!"doc_count".equals(c.getKey()) &&!"key".equals(c.getKey()))
						{ dataTable.getColumns().put(c.getKey(), null); }
					}
				}
				//填充值
				for (var b:first.buckets)
				{
					var dataRow = dataTable.newRow();
					dataRow.put(firstKey,b.get("key"));
					for (var c: b.entrySet())
					{
						if(dataTable.getColumns().containsKey(c.getKey()))
							dataRow.put(c.getKey(), getValue(c.getValue()));
					}
					dataTable.getRows().add(dataRow);
				}
				return dataTable;

			}
			else if (first != null&& first.value!=null)
			{
				DataTable dataTable = new DataTable();
				//常规统计方式
				for(var o: result.aggregations.entrySet())
				{
					dataTable.getColumns().put(o.getKey(), null);
				}
				DataRow dataRow = dataTable.newRow();
				for (var o: result.aggregations.entrySet())
				{
					if (dataTable.getColumns().containsKey(o.getKey()))
						dataRow.put(o.getKey(),getValue(o.getValue()));
				}
				dataTable.getRows().add(dataRow);
				return dataTable;
			}
		}
		else if (result.getSources() != null && result.getSources().size() > 0)
		{
			DataTable dataTable = new DataTable();
			var first = result.getSources().get(0);
			for (var item : first.entrySet())
			{
				dataTable.getColumns().put(item.getKey(), null);
			}
			for (var m: result.getSources())
			{
				DataRow dataRow = dataTable.newRow();
				for (var item : m.entrySet())
				{
					if (dataTable.getColumns().containsKey(item.getKey()))
						dataRow.put(item.getKey(),getValue(item.getValue()));
				}
				dataTable.getRows().add(dataRow);
			}
			return dataTable;
		}
		return null;
	}

	private Object getValue(Object value)
	{
		if (value != null && (value instanceof Collection||value.getClass().isArray()))
			return JsonUtils.serialize(value);
		if (value != null && (value.getClass().isPrimitive() || value instanceof String))
			return value;
		if (value != null && mapClass.isAssignableFrom(value.getClass()))
		{
			var value2 = (HashMap<String,Object>)value;
			if( value2.containsKey("value_as_string"))
				return getValue(value2.get("value_as_string"));
			if (value2.containsKey("value"))
				return getValue(value2.get("value"));
		}
		if (value != null && ReflectionUtils.findField(value.getClass(),"value")!=null)
		{
			if (ReflectionUtils.findField(value.getClass(),"value_as_string")!=null&&
					ReflectionUtils.tryGetFieldValue(value,"value_as_string",null)!=null)
				return ReflectionUtils.tryGetFieldValue(value,"value_as_string",null);
			if (ReflectionUtils.findField(value.getClass(),"value")!=null&&ReflectionUtils.tryGetFieldValue(value,"value",null) != null)
				return ReflectionUtils.tryGetFieldValue(value,"value",null);
		}
		return JsonUtils.serialize(value);
	}
}
